热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

连续性|量词_FlinkCEPFlink的复杂事件处理

篇首语:本文由编程笔记#小编为大家整理,主要介绍了FlinkCEP-Flink的复杂事件处理相关的知识,希望对你有一定的参考价值。1FlinkCEP

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Flink CEP - Flink的复杂事件处理相关的知识,希望对你有一定的参考价值。



1 Flink CEP 是什么

FlinkCEP - Flink的复杂事件处理。它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分


2 Flink CEP 特点

目标:从有序的简单事件流中发现一些高阶特征
输入:一个或多个由简单事件构成的事件流
处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
输出:满足规则的复杂事件


3 Flink CEP 应用场景

风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。


4 Flink CEP原理

Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态、中间状态、最终状态三种,边分为take、ignore、proceed三种。

take:必须存在一个条件判断,当到来的消息满足take边条件判断时,把这个消息放入结果集,将状态转移到下一状态。
ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。
proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。
模式API (Pattern API)

模式API可以让你定义想从输入流中抽取的复杂模式序列

个体模式:组成复杂规则的每一个单独的模式定义,就是“个体模式”

A 量词

在FlinkCEP中,你可以通过这些方法指定循环模式:


  • pattern.oneOrMore() 指定期望一个给定事件出现一次或者多次的模式
  • pattern.times(#fTimes) 指定期望一个给定事件出现特定次数的模式,例如出现4次A
  • pattern.times(#fromTimes, #toTimes) 指定期望一个给定事件出现次数在一个最小值和最大值中间的模式,比如出现2-4次A
  • pattern.greedy() 方法让循环模式变成贪心的
  • pattern.optional() 方法让所有的模式变成可选的,不管是否是循环模式
    // 期望出现4次start.times(4)// 期望出现0或者4次
  • start.times(4).optional()// 期望出现2、3或者4次 -
  • start.times(2, 4)// 期望出现2、3或者4次,并且尽可能的重复次数多
  • start.times(2, 4).greedy()// 期望出现0、2、3或者4次
  • start.times(2, 4).optional()// 期望出现0、2、3或者4次,并且尽可能的重复次数多
  • start.times(2, 4).optional().greedy()// 期望出现1到多次
  • start.oneOrMore()// 期望出现1到多次,并且尽可能的重复次数多
  • start.oneOrMore().greedy()// 期望出现0到多次
  • start.oneOrMore().optional()// 期望出现0到多次,并且尽可能的重复次数
  • start.oneOrMore().optional().greedy()// 期望出现2到多次
  • start.timesOrMore(2)// 期望出现2到多次,并且尽可能的重复次数
  • start.timesOrMore(2).greedy()// 期望出现0、2或多次
  • start.timesOrMore(2).optional()// 期望出现0、2或多次,并且尽可能的重复次数多
  • start.timesOrMore(2).optional().greedy()B 条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,指定判断事件属性的条件可以通过


  • pattern.where() 增加新的条件,多个条件应该同时满足 pattern.where(event => … /*
    一些判断条件 */)

  • pattern.or() 增加新的条件,多个条件满足任意一个都可以

  • pattern.where(event =>
    … /* 一些判断条件 /) .or(event => … / 替代条件 */)

  • pattern.until()
    为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。如"(a+ until b)"
    (一个或者更多的"a"直到"b")

  • pattern.oneOrMore().until(event => … /* 替代条件
    */)以上这些可以是可迭代的

  • subtype(subClass) 为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式

  • pattern.subtype(classOf[SubEvent]) oneOrMore()
    指定模式期望匹配到的事件至少出现一次。默认(在子事件间)使用松散的内部连续性。关于内部连续性的更多信息可以参考连续性。提示:推荐使用until()或者within()来清理状态。

  • pattern.oneOrMore() timesOrMore(#times)
    指定模式期望匹配到的事件正好出现的次数。默认(在子事件间)使用松散的内部连续性。关于内部连续性的更多信息可以参考连续性。

  • pattern.timesOrMore(2) optional()
    指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。

  • pattern.oneOrMore().optional() greedy()
    指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。

  • pattern.oneOrMore().greedy()迭代条件 :
    这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。

  • pattern.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx)
    => lazy val sum = ctx.getEventsForPattern(“middle”).map(_.getPrice).sum
    value.getName.startsWith(“foo”) && sum &#43; value.getPrice <5.0 )组合模式:很多个体模式组合起来&#xff0c;就形成了整个的模式序列

FlinkCEP支持事件之间如下形式的连续策略

// 严格连续
val strict: Pattern[Event, _] &#61; start.next("event").where(...)
// 松散连续
val relaxed: Pattern[Event, _] &#61; start.followedBy("event").where(...)
// 不确定的松散连续
val nonDetermin: Pattern[Event, _] &#61; start.followedByAny("event").where(...)
// 严格连续的NOT模式
val strictNot: Pattern[Event, _] &#61; start.notNext("not").where(...)
// 松散连续的NOT模式
val relaxedNot: Pattern[Event, _] &#61; start.notFollowedBy("not").where(...)

松散连续意味着跟着的事件中&#xff0c;只有第一个可匹配的事件会被匹配上&#xff0c;而不确定的松散连接情况下&#xff0c;有着同样起始的多个匹配会被输出。举例来说&#xff0c;模式"a b"&#xff0c;给定事件序列"a"&#xff0c;“c”&#xff0c;“b1”&#xff0c;“b2”&#xff0c;会产生如下的结果&#xff1a;

"a"和"b"之间严格连续&#xff1a; &#xff08;没有匹配&#xff09;&#xff0c;"a"之后的"c"导致"a"被丢弃。

"a"和"b"之间松散连续&#xff1a;a b1&#xff0c;松散连续会”跳过不匹配的事件直到匹配上的事件”。

"a"和"b"之间不确定的松散连续&#xff1a;a b1, a b2&#xff0c;这是最常见的情况。

也可以为模式定义一个有效时间约束。例如&#xff0c;你可以通过pattern.within()方法指定一个模式应该在10秒内发生。这种时间模式支持处理时间和事件时间.

next.within(Time.seconds(10))循环模式中的连续性

对于循环模式&#xff08;例如oneOrMore()和times())&#xff09;&#xff0c;默认是松散连续。如果想使用严格连续&#xff0c;你需要使用consecutive()方法明确指定&#xff0c; 如果想使用不确定松散连续&#xff0c;你可以使用allowCombinations()方法。

consecutive

与oneOrMore()和times()一起使用&#xff0c; 在匹配的事件之间施加严格的连续性&#xff0c; 也就是说&#xff0c;任何不匹配的事件都会终止匹配&#xff08;和next()一样&#xff09;。如果不使用它&#xff0c;那么就是松散连续&#xff08;和followedBy()一样&#xff09;

Pattern.begin("start")
.where(_.getName().equals("c"))
.followedBy("middle")
.where(_.getName().equals("a"))
.oneOrMore()
.consecutive()
.followedBy("end1")
.where(_.getName().equals("b"))

输入&#xff1a;C D A1 A2 A3 D A4 B&#xff0c;会产生下面的输出&#xff1a;如果施加严格连续性&#xff1a; C A1 B&#xff0c;C A1 A2 B&#xff0c;C A1 A2 A3 B不施加严格连续性&#xff1a; C A1 B&#xff0c;C A1 A2 B&#xff0c;C A1 A2 A3 B&#xff0c;C A1 A2 A3 A4 BallowCombinations

与oneOrMore()和times()一起使用&#xff0c; 在匹配的事件中间施加不确定松散连续性&#xff08;和followedByAny()一样&#xff09;。如果不使用&#xff0c;就是松散连续&#xff08;和followedBy()一样&#xff09;

Pattern.begin("start")
.where(_.getName().equals("c"))
.followedBy("middle")
.where(_.getName().equals("a"))
.oneOrMore()
.allowCombinations()
.followedBy("end1").where(_.getName().equals("b"))

输入&#xff1a;C D A1 A2 A3 D A4 B&#xff0c;会产生如下的输出&#xff1a;如果使用不确定松散连续&#xff1a; C A1 B&#xff0c;C A1 A2 B&#xff0c;C A1 A3 B&#xff0c;C A1 A4 B&#xff0c;C A1 A2 A3 B&#xff0c;C A1 A2 A4 B&#xff0c;C A1 A3 A4 B&#xff0c;C A1 A2 A3 A4 B如果不使用&#xff1a;C A1 B&#xff0c;C A1 A2 B&#xff0c;C A1 A2 A3 B&#xff0c;C A1 A2 A3 A4 B5 匹配后跳过策略

对于一个给定的模式&#xff0c;同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上&#xff0c;你需要指定跳过策略AfterMatchSkipStrategy。有五种跳过策略&#xff0c;如下&#xff1a;

NO_SKIP: 每个成功的匹配都会被输出。
SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。
SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。

val skipStrategy &#61; AfterMatchSkipStrategy.skipToFirst("patternName").throwExceptionOnMiss()Pattern.begin("patternName", skipStrategy)

例如&#xff0c;给定一个模式b&#43; c和一个数据流b1 b2 b3 c&#xff0c;不同跳过策略之间的不同如下&#xff1a;


5 模式的检测

在指定了要寻找的模式后&#xff0c;该把它们应用到输入流上来发现可能的匹配了。为了在事件流上运行你的模式&#xff0c;需要创建一个PatternStream。给定一个输入流input&#xff0c;一个模式pattern和一个可选的用来对使用事件时间时有同样时间戳或者同时到达的事件进行排序的比较器comparator

object Cep
def main(args: Array[String]): Unit &#61;
val env: StreamExecutionEnvironment &#61; StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1)
val loginDS: DataStream[LoginInfo] &#61; env.fromElements("") .map(data &#61;>
val datas: Array[String] &#61; data.split(",") LoginInfo(datas(0), datas(1), datas(2).toInt,datas(3).toLong) )
// 创建规则
val pattern: Pattern[LoginInfo, LoginInfo] &#61; Pattern.begin[LoginInfo]("start")
// 增加新的条件&#xff0c;多个条件应该同时满足
.where(_.age > 1) .where(_.age < 12)
// 应用规则&#xff1a;将规则应用到数据流中
val loginPS: PatternStream[LoginInfo] &#61; CEP.pattern(loginDS,pattern)
// 获取规则的结果
val ds: DataStream[String] &#61; loginPS.select(
map &#61;>
map.toString ) ds.print() env.execute()


case class LoginInfo( id:String, name:String, age:Int, loginTime:Long
)

6 从模式中选取

创建 PatternStream 之后&#xff0c;就可以应用 select 或者 flatselect 方法&#xff0c;从检测到的事件序列中提取事件了

select() 方法需要输入一个 select function 作为参数&#xff0c;每个成功匹配的事件序列都会调用它

select() 以一个 Map[String&#xff0c;Iterable [IN]] 来接收匹配到的事件序列&#xff0c;其中 key 就是每个模式的名称&#xff0c;而 value 就是所有接收到的事件的 Iterable 类型

val loginPS: PatternStream[LoginInfo] &#61; CEP.pattern(loginDS,pattern)
val outputTag: OutputTag[String] &#61; OutputTag[String]("side-output")
val result: SingleOutputStreamOperator[CompleLogin] &#61;loginPS.select(outputTag) (pattern: Map[String, Iterable[LoginInfo]], timestamp: Long, out: Collector[TimeoutLogin]) &#61;> out.collect(TimeoutLogin()) (pattern: mutable.Map[String, Iterable[LoginInfo]], out: Collector[CompleLogin]) &#61;> out.collect(CompleLogin())
val timeoutResult: DataStream[TimeoutLogin] &#61;result.getSideOutput(outputTag)

推荐阅读
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 在Delphi7下要制作系统托盘,只能制作一个比较简单的系统托盘,因为ShellAPI文件定义的TNotifyIconData结构体是比较早的版本。定义如下:1234 ... [详细]
  • 本地存储组件实现对IE低版本浏览器的兼容性支持 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 如果应用程序经常播放密集、急促而又短暂的音效(如游戏音效)那么使用MediaPlayer显得有些不太适合了。因为MediaPlayer存在如下缺点:1)延时时间较长,且资源占用率高 ... [详细]
  • 本文详细介绍了 Pentaho Kettle 中 RowMetaInterface.writeMeta 方法的使用,并提供了多个代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • 本文详细介绍了Java反射机制的基本概念、获取Class对象的方法、反射的主要功能及其在实际开发中的应用。通过具体示例,帮助读者更好地理解和使用Java反射。 ... [详细]
  • 本文将带你快速了解 SpringMVC 框架的基本使用方法,通过实现一个简单的 Controller 并在浏览器中访问,展示 SpringMVC 的强大与简便。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 本文总结了一些开发中常见的问题及其解决方案,包括特性过滤器的使用、NuGet程序集版本冲突、线程存储、溢出检查、ThreadPool的最大线程数设置、Redis使用中的问题以及Task.Result和Task.GetAwaiter().GetResult()的区别。 ... [详细]
  • 大类|电阻器_使用Requests、Etree、BeautifulSoup、Pandas和Path库进行数据抓取与处理 | 将指定区域内容保存为HTML和Excel格式
    大类|电阻器_使用Requests、Etree、BeautifulSoup、Pandas和Path库进行数据抓取与处理 | 将指定区域内容保存为HTML和Excel格式 ... [详细]
author-avatar
mobiledu2502911637
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有